package com.lkd.job;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.lkd.config.TopicConfig;
import com.lkd.contract.SupplyChannel;
import com.lkd.contract.SupplyContract;
import com.lkd.dao.ChannelDao;
import com.lkd.emq.MqttProducer;
import com.lkd.entity.ChannelEntity;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author brianxia
 * @version 1.0
 * @date 2023-02-11 9:18
 */
@Component
public class AutoSupplyVmJob {

    @Autowired
    private ChannelDao channelDao;
    @Autowired
    private MqttProducer mqttProducer;

    /**
     * 自动扫描需要补货的售货机
     */
    @XxlJob("autoSupplyVm")
    public void autoSupplyVm() throws JsonProcessingException {
        //1.获取需要补货的货道  and vm.id % 总数  =  当前服务实例编号
        int shardTotal = XxlJobHelper.getShardTotal();
        int shardIndex = XxlJobHelper.getShardIndex();
        List<ChannelEntity> channelsNeedToSupply = channelDao.getChannelsNeedToSupply(shardTotal,shardIndex);

        if(CollectionUtils.isEmpty(channelsNeedToSupply)){
            return;
        }
        //1.1获取所有售货机inner_code
        List<String> innerCodes =
                channelsNeedToSupply.stream().map(ChannelEntity::getInnerCode).distinct().collect(Collectors.toList());

        for (String innerCode : innerCodes) {
            //2.生成协议对象
            SupplyContract supplyContract = new SupplyContract();
            //遍历每一台售货机
            List<SupplyChannel> supplyChannels = channelsNeedToSupply.stream().filter(x -> x.getInnerCode().equals(innerCode))
                    .map(x -> {
                        SupplyChannel supplyChannel = new SupplyChannel();
                        supplyChannel.setChannelCode(x.getChannelCode());
                        supplyChannel.setCapacity(x.getMaxCapacity() - x.getCurrentCapacity());
                        supplyChannel.setSkuId(x.getSkuId());
                        if (x.getSku() != null) {
                            supplyChannel.setSkuName(x.getSku().getSkuName());
                            supplyChannel.setSkuImage(x.getSku().getSkuImage());
                        }
                        return supplyChannel;
                    }).collect(Collectors.toList());

            supplyContract.setSupplyData(supplyChannels);
            supplyContract.setInnerCode(innerCode);

            //3.发送消息
            mqttProducer.send(TopicConfig.TASK_SUPPLY_AUTOTOPIC,1,supplyContract);
        }

    }

    public static void main(String[] args) {
        ArrayList<Integer> list = new ArrayList<>();
        list.add(1);
        list.add(2);
        list.add(3);
        list.add(1);
        List<Integer> collect = list.stream().distinct().collect(Collectors.toList());
        System.out.println(collect);
    }
}
